消息队列 RabbitMQ -- 介绍

RabbitMQ 是除了 Qpid 之外, 唯一实现了AMQP(高级消息协议:Advanced Message Queue Protocol)标准的代理服务器。 虽然AMQP像邮箱那样为离线消费者存储消息,但是这些根据标签路由的消息更为灵活。同时和邮件不同的是,这些消息没有固定的结构,甚至于可以直接存储二进制数据。不同于IM协议,AMQP隐去了消息的发送方和接受方。AMQP也没有“存在”这个概念。 对负载均衡来说,队列是绝佳方案.

目录

生产者创建消息(消息包含两部分: 有效载荷[payload] 和 标签[label]),然后发布(Publish) 到代理服务器(RabbitMQ)

消费者连接到代理服务器,并订阅到队列上。当消费者接受到消息时,它只得到了消息的一部分:有效载荷。在消息路由过程中,消息的标签并没有随有效载荷一同传递。 如果需要明确知道谁生产的AMQP消息的话,就要看生产者是否把消息方信息放入有效载荷中。

AMQP栈

AMQP消息路由必须有三个部分: 交换器、队列、绑定。 生产者把消息发布到交换器上;消息最终到达队列,并被消费者解释;绑定决定了消息如何从路由器 路由到特定队列。

消费者和生产者到底谁去创建队列?

如果你不能承担得起消息进入“黑洞”而丢失的话,你的生产者和消费者就都应该尝试去创建队列。

生产者创建消息

        import "github.com/streadway/amqp"
        // step 1. 创建 connection
        conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
        // step 2. 获取 (信道)channel
        channel, err := conn.Channel()
        // step 3. 在信道上声明交换器 exchange
        channel.ExchangeDeclare(
            "exchange_name",     // exchange name
            amqp.ExchangeDirect, // exchange type
            false,               // durable
            false,               // autoDelete
            false,               // internal
            false,               // noWait
            nil,                 // args amqp.Table
        )
        // step 4. 声明队列
        queue, err := ch.QueueDeclare(
                "queue_name",    // queue name
                false,           // durable
                false,           // delete when unused
                false,           // exclusive
                false,           // no-wait
                nil,             // arguments amqp.Table
        )
        // step 5. 将队列绑定根据路由键绑定到交换器上
        channel.QueueBind(
            queue.Name,          // queue name
            "route_key",         // route key
            "exchange_name",     // exchange name
            false,               // no-wait
            nil,                 // arguments amqp.Table
        )
        // step 6. 将消息发送到交换器上,交换器会根据路由键将消息发送到对应的队列queue
        channel.Publish(
                "exchange_name", // exchange
                "route_key",     // routing key
                false,           // mandatory
                false,           // immediate
                amqp.Publishing{ // message
                    ContentType: "text/plain",
                    Body:        []byte("hello world"),
        })

消费者订阅消息

       import "github.com/streadway/amqp"
       // step 1. 创建 connection
       conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
       // step 2. 获取 (信道)channel
       channel, err := conn.Channel()
       // step 3. 在信道上声明交换器 exchange
       channel.ExchangeDeclare(
           "exchange_name",     // exchange name
           amqp.ExchangeDirect, // exchange type
           false,               // durable
           false,               // autoDelete
           false,               // internal
           false,               // noWait
           nil,                 // args amqp.Table
       )
       // step 4. 声明队列
       queue, err := ch.QueueDeclare(
               "queue_name",    // queue name
               false,           // durable
               false,           // delete when unused
               false,           // exclusive
               false,           // no-wait
               nil,             // arguments amqp.Table
       )
       // step 5. 将队列绑定根据路由键绑定到交换器上
       channel.QueueBind(
           queue.Name,          // queue name
           "route_key",         // route key
           "exchange_name",     // exchange name
           false,               // no-wait
           nil,                 // arguments amqp.Table
       )
       forever := make(chan bool)
       // step 6. 在信道上订阅队列
       messages, err := channel.Consume(
           queue.Name,           // queue
           "",                   // consumer
           false,                // autoAck
           false,                // exclusive
           false,                // noLocal :The noLocal flag is not supported by RabbitMQ.
           false,                // no-wait
           nil,                  // arguments amqp.Table
       )
      // 开启一个 goruntine 获取消息内容 
       go func() {
           for message := range messages {
                   log.Printf("Received a message: %s", message.Body)
                   message.Ack(true)
           }
	    }()
       fmt.Println("Please ctrl+c to stop")
       <-forever
# mq 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×